{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark import SparkConf, SparkContext\n",
    "\n",
    "conf = SparkConf().setMaster('local').setAppName('EmailSpam')\n",
    "sc = SparkContext(conf=conf)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.mllib.regression import LabeledPoint\n",
    "from pyspark.mllib.feature import HashingTF\n",
    "from pyspark.mllib.classification import LogisticRegressionWithSGD\n",
    "\n",
    "spam = sc.textFile('spam.txt')\n",
    "ham = sc.textFile('ham.txt')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [],
   "source": [
    "tf = HashingTF(numFeatures = 10000)\n",
    "spamFeatures = spam.map(lambda email: tf.transform(email.split(' ')))\n",
    "hamFeatures = ham.map(lambda email: tf.transform(email.split(' ')))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "UnionRDD[6] at union at NativeMethodAccessorImpl.java:0"
      ]
     },
     "execution_count": 6,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# Create LabeledPoint datasets for positive (spam) and negative (ham) examples.\n",
    "\n",
    "positiveExamples = spamFeatures.map(lambda features: LabeledPoint(1, features))\n",
    "negativeExamples = hamFeatures.map(lambda features: LabeledPoint(0, features))\n",
    "\n",
    "trainingData = positiveExamples.union(negativeExamples)\n",
    "trainingData.cache()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[LabeledPoint(1.0, (10000,[0,365,455,509,1320,1363,1583,2321,2403,3289,3342,4995,5336,5706,5831,6052,6300,6582,6744,8971,8977,9232,9604,9646,9878],[1.0,1.0,1.0,1.0,1.0,2.0,1.0,2.0,1.0,2.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])),\n",
       " LabeledPoint(1.0, (10000,[0,365,940,2220,3122,4460,4671,5336,5849,8479,9604],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0]))]"
      ]
     },
     "execution_count": 7,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "trainingData.take(2)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Logistic Regression"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [],
   "source": [
    "model = LogisticRegressionWithSGD.train(trainingData)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Prediction for positive test example: 0\n",
      "Prediction for negative test example: 0\n"
     ]
    }
   ],
   "source": [
    "posTest = tf.transform('O M G GET cheap stuff by sending money to ...'.split(' '))\n",
    "negTest = tf.transform('Hi Dad, I started studying Spark the other ...'.split(' '))\n",
    "print('Prediction for positive test example: %g' % model.predict(posTest))\n",
    "print('Prediction for negative test example: %g' % model.predict(negTest))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Creating vectors"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {},
   "outputs": [],
   "source": [
    "from numpy import array\n",
    "from pyspark.mllib.linalg import Vectors\n",
    "\n",
    "denseVec1 = array([1.0, 2.0, 3.0])\n",
    "denseVec2 = Vectors.dense([1.0, 2.0, 3.0])\n",
    "\n",
    "sparseVec1 = Vectors.sparse(4, {0:1.0, 2:2.0})\n",
    "sparseVec2 = Vectors.sparse(4, [0, 2], [1.0, 2.0])"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Using HashingTF"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 17,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "SparseVector(10000, {7772: 2.0, 9657: 1.0})"
      ]
     },
     "execution_count": 17,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "sentence = 'hello hello world'\n",
    "words = sentence.split()\n",
    "tf = HashingTF(10000)\n",
    "tf.transform(words)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 20,
   "metadata": {},
   "outputs": [
    {
     "ename": "SyntaxError",
     "evalue": "invalid syntax (<ipython-input-20-fe39362ec14d>, line 1)",
     "output_type": "error",
     "traceback": [
      "\u001b[0;36m  File \u001b[0;32m\"<ipython-input-20-fe39362ec14d>\"\u001b[0;36m, line \u001b[0;32m1\u001b[0m\n\u001b[0;31m    rdd = sc.wholeTextFiles(\"data\").map(lambda (name, text): text.split())\u001b[0m\n\u001b[0m                                               ^\u001b[0m\n\u001b[0;31mSyntaxError\u001b[0m\u001b[0;31m:\u001b[0m invalid syntax\n"
     ]
    }
   ],
   "source": [
    "rdd = sc.wholeTextFiles(\"data\").map(lambda (name, text): text.split())\n",
    "tfVectors = tf.transform(rdd)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "conda_python3",
   "language": "python",
   "name": "conda_python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.4"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
